use super::{
    ActiveWorker, Attr, BoxedWorker, Config, Extensions, GroupSender, StopWorker, Task, TaskQueue,
    TaskRef,
};
use crate::channel::mpsc;
use hipthread::{self, JoinHandle, OnceLock};
use crate::Result;
use core::future::Future;
use core::result;
use core::sync::atomic::{AtomicBool, Ordering};
use hipool::{Arc, Boxed, MemPool, NullAlloc, PoolAlloc};

pub(crate) type MpscSender = mpsc::Sender<'static, Signo, MemPool>;
pub(crate) type MpscReceiver = mpsc::Receiver<'static, Signo, MemPool>;

#[allow(non_camel_case_types)]
pub enum Signo {
    SIG_STOP,
    SIG_NAME(&'static str),
}

struct GroupArray {
    groups: [OnceLock<ArcGroup>; 256],
}

unsafe impl Sync for GroupArray {}

impl GroupArray {
    fn instance() -> &'static Self {
        static GROUPS: OnceLock<GroupArray> = OnceLock::new();
        GROUPS.get_or_init(|| GroupArray {
            groups: core::array::from_fn(|_| OnceLock::new()),
        })
    }

    fn get(id: u8) -> Option<&'static ArcGroup> {
        Self::instance().groups[id as usize].get()
    }

    fn set(init: ArcGroup) -> core::result::Result<(), ArcGroup> {
        Self::instance().groups[init.id() as usize].set(init)
    }
}

#[repr(C)]
pub(crate) struct WorkerProxy {
    id: usize,
    mpscs: ArcMpsc,
    stopped: AtomicBool,
    thread: Option<JoinHandle<()>>,
}

unsafe impl Sync for WorkerProxy {}

type BoxedWorkers = Boxed<'static, [Option<WorkerProxy>], MemPool>;

impl WorkerProxy {
    fn stop(&mut self) {
        if !self.stopped.swap(true, Ordering::Relaxed) {
            if let Some(handle) = self.thread.take() {
                self.signal(Signo::SIG_STOP);
                let _ = handle.join();
            }
        }
    }
    fn signal(&self, sig: Signo) {
        self.mpscs[self.id].send(sig);
    }
    fn try_signal(&self, sig: Signo) -> result::Result<(), Signo> {
        self.mpscs[self.id].try_send(sig)
    }
}

impl Drop for WorkerProxy {
    fn drop(&mut self) {
        self.stop();
    }
}

type ArcQueue = Arc<'static, [TaskQueue], MemPool>;
type ArcMpsc = Arc<'static, [MpscSender], MemPool>;

#[allow(dead_code)]
#[repr(C)]
pub struct Group {
    sender: GroupSender,
    queues: ArcQueue,
    mpscs: ArcMpsc,
    workers: BoxedWorkers,
    conf: Config,
    pool: &'static MemPool,
    // Rust字段的析构顺序为定义的顺序，这里保证pool是最后一个成员，前面数据都从pool中分配的内存.
    pool_guard: Boxed<'static, MemPool, NullAlloc>,
}

unsafe impl Sync for Group {}

pub(crate) type ArcGroup = Arc<'static, Group, MemPool>;

impl Group {
    pub(crate) fn build(conf: &Config) -> Result<()> {
        if GroupArray::get(conf.id).is_some() {
            return Ok(());
        }

        let (group, channels) = Self::new(conf)?;

        let group = Self::active_workers(group, channels)?;

        if let Err(mut group) = GroupArray::set(group) {
            // # Safety
            // 设置失败，外部不可能访问group，这里调用get_mut_unchecked是安全的.
            unsafe {
                group.get_mut_unchecked().stop();
            }
        }

        Ok(())
    }

    fn new(conf: &Config) -> Result<(ArcGroup, Channels)> {
        let conf = Self::conf(conf);
        // 都在堆上申请，保证'static
        let (pool, layout, alloc) = MemPool::new_boxed(0)?.leak();
        let pool_guard = unsafe { Boxed::from_with(pool.into(), layout, alloc) };

        let nth = conf.nth;
        let queues = Arc::new_slice_then_in(pool, nth, |_| Ok(TaskQueue::new()))?;
        let mut channels = Channels::new(pool, nth, conf.qlen)?;
        let mpscs = Arc::new_slice_then_in(pool, nth, |n| Ok(channels.mpsc_send(n)))?;
        let workers = Boxed::new_slice_then_in::<Option<WorkerProxy>, _>(pool, nth, |_| Ok(None))?;

        let group = Arc::new_in(
            pool,
            Self {
                sender: GroupSender::new(),
                queues,
                mpscs,
                workers,
                conf,
                pool,
                pool_guard,
            },
        )?;

        Ok((group, channels))
    }

    fn active_workers(mut group: ArcGroup, mut channels: Channels) -> Result<ArcGroup> {
        for n in 0..group.conf.nth {
            let channel = channels.mpsc_recv(n);
            let queue = group.queues.clone();
            let thread = ActiveWorker::active(
                group.clone(),
                n as u16,
                group.conf.max_cache,
                channel,
                queue,
            )?;
            let proxy = WorkerProxy {
                id: n,
                mpscs: group.mpscs.clone(),
                thread: Some(thread),
                stopped: AtomicBool::new(false),
            };
            let _ = proxy.try_signal(Signo::SIG_NAME(group.conf.name));
            // # Safety
            // 还未调用GroupArray::set，group外部不会访问，所以这里调用get_mut_unchecked是安全的.
            unsafe {
                group.get_mut_unchecked().workers[n].replace(proxy);
            }
        }
        Ok(group)
    }

    fn local_worker(group: ArcGroup, mut channels: Channels) -> Result<(ArcGroup, BoxedWorker)> {
        let channel = channels.mpsc_recv(0);
        let queue = group.queues.clone();
        let worker = ActiveWorker::new_in(
            group.pool,
            group.clone(),
            0,
            group.conf.max_cache,
            channel,
            queue,
        )?;
        Ok((group, worker))
    }
}

impl Group {
    /// 基于id获取运行时，如果运行时未启动，则会panic
    pub fn get(id: u8) -> &'static ArcGroup {
        return GroupArray::get(id).expect("Runtime {id} don't be inited");
    }

    pub(crate) fn spawn<T: Future>(group: &ArcGroup, future: T, attr: &Attr) -> Result<TaskRef> {
        let mut task = Task::new(future, attr, group.clone())?;
        let queue = group.queues.as_ref();
        if attr.hash == 0 {
            group.sender.send(queue, task.clone());
        } else {
            let id = attr.hash % queue.len();
            task.status.set_local(id as u16);
            queue[id].push(task.clone());
        }
        Ok(task)
    }

    pub(crate) fn sched(&self, task: TaskRef, local: Option<u16>) {
        let queue = self.queues.as_ref();
        // 调用很频繁，rust无分支预测的语言特性
        #[allow(clippy::unnecessary_unwrap)]
        if local.is_none() {
            self.sender.send(queue, task);
        } else {
            queue[local.unwrap() as usize].push(task);
        }
    }

    pub(crate) fn id(&self) -> u8 {
        self.conf.id
    }

    // 本线程创建一个Group实例调度此future然后退出.
    pub(crate) fn local_spawn<T: Future>(future: T, attr: &Attr) -> Result<TaskRef> {
        // 只有一个worker
        let mut conf = Config::new(0);
        conf.nth = 1;
        conf.max_cache = attr.max_cache;

        // 创建一个Group实例，无需关心Group::workers
        let (group, channels) = Self::new(&conf)?;
        let (group, mut worker) = Self::local_worker(group, channels)?;

        // 创建入口task, 执行完毕后需要退出
        let task = Task::new(future.seq(StopWorker), attr, group.clone())?;
        let queue = group.queues.as_ref();
        group.sender.send(queue, task.clone());

        // 启用调度
        worker.run();

        Ok(task)
    }
}

impl Group {
    fn stop(&mut self) {
        for worker in self.workers.as_mut() {
            if let Some(worker) = worker {
                worker.stop();
            }
        }
    }

    fn conf(conf: &Config) -> Config {
        Config {
            qlen: Self::get_qlen(conf.qlen),
            nth: Self::get_nth(conf.nth),
            ..conf.clone()
        }
    }

    fn get_qlen(qlen: usize) -> usize {
        if qlen > 0 {
            qlen
        } else {
            8
        }
    }

    fn get_nth(nth: usize) -> usize {
        if nth == 0 {
            hipthread::sched_cpu_count().unwrap_or(1)
        } else if nth >= ((u16::MAX as usize) >> 1) {
            u16::MAX as usize >> 1
        } else {
            nth
        }
    }
}

struct Channels {
    mpsc_send: Boxed<'static, [Option<MpscSender>], PoolAlloc>,
    mpsc_recv: Boxed<'static, [Option<MpscReceiver>], PoolAlloc>,
}

impl Channels {
    fn new(pool: &'static MemPool, nth: usize, qlen: usize) -> Result<Self> {
        let mut mpsc_send =
            Boxed::new_slice_then_in::<Option<MpscSender>, _>(&PoolAlloc, nth, |_| Ok(None))?;
        let mut mpsc_recv =
            Boxed::new_slice_then_in::<Option<MpscReceiver>, _>(&PoolAlloc, nth, |_| Ok(None))?;
        for (send, recv) in mpsc_send.iter_mut().zip(mpsc_recv.iter_mut()) {
            let (mpsc_send, mpsc_recv) = mpsc::channel_in(pool, qlen)?;
            send.replace(mpsc_send);
            recv.replace(mpsc_recv);
        }
        Ok(Self {
            mpsc_send,
            mpsc_recv,
        })
    }

    fn mpsc_recv(&mut self, n: usize) -> MpscReceiver {
        self.mpsc_recv[n].take().unwrap()
    }

    fn mpsc_send(&mut self, n: usize) -> MpscSender {
        self.mpsc_send[n].as_ref().unwrap().clone()
    }
}
